package org.apache.celestial.connector;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.Assert;

import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class KafkaConnector implements IConnector {

    private Integer capacity = 1024;

    private BlockingDeque<ConsumerRecord<String, String>> datas = new LinkedBlockingDeque<>(capacity);

    public static void main(String[] args) throws Exception {
        IConnector<String> connector = new KafkaConnector();

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.11.205:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-stream-group-1");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"Kunfei@1405#\";");
        props.put(ConnectorConstants.KAFKA_TOPIC, "cloud_event_iot_raw_data");
        connector.init(props);



    }

    @Override
    public void init(Properties props) throws IOException, InterruptedException {

        Assert.notNull(props.get(ConnectorConstants.KAFKA_TOPIC), "please provide the kafka topic!");
        String topic = props.getProperty(ConnectorConstants.KAFKA_TOPIC);

        new SimpleAsyncTaskExecutor("kafka-connector-init").execute(() -> {
            KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);
            kc.subscribe(Collections.singletonList(topic));

            while (true) {
                @SuppressWarnings("deprecation")
                ConsumerRecords<String, String> crs = kc.poll(1000);
                for (ConsumerRecord<String, String> cr : crs) {
                    try {
                        datas.put(cr);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

    @Override
    public Object getData() {
        ConsumerRecord<String, String> take = null;
        try {
            take = datas.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (take!=null) return take.value();
        else return null;
    }
}
